Kafka 4, fantastique ?

Damien Lucas

Software Engineer

Logo Mirakl Blue Horizontal

dlucasd

dlucas
Nos Clients
Mirakl Labs (R&D)
350+
42%
2%
membres
full remote
turnover
Applications
Datastore
Platform
Cloud provider
Tooling
Notre Stack Technique

Commits par release

0 400 800 1200 1600 2.0.0 07/2018 2.5.0 3.0.0 09/2021 3.5.0 4.0.0 03/2025 4.1.0 09/2025 596 783 1552

Kafka Improvement Proposals (KIP)

heatMap

  • 📘 Migrer vers 4.x

  • 🧹 Grand ménage

  • 💸 Coûts et complexité à maintenir

  • ❌ Java 8 et Scala 2.12 plus supportés

  • ☕️ Kafka Clients & Kafka Streams → Java 11

  • ☕️ Broker, Connect, Tools, MM2 → Java 17

Protocole client-broker

  • ❌ Retrait des anciennes API client-broker

  • ⚙️ Broker 4.0 ⇄ Clients ≥ 2.1, Clients 4.0 ⇄ Brokers ≥ 2.1

  • 👨‍💻 Ops : activer la métrique DeprecatedRequestsPerSec sur les brokers (3.7+), si >0 alors impact

Format de message

  • ❌ Formats de message v0/v1 plus supportés en écriture

  • ✉️ Le “nouveau” format v2 devient l’unique format côté écriture

Migration vers Log4j2

  • ❌ Sortir définitivement de Log4j 1.x

  • ⚙️ Outil de migration log4j-transform-cli

  • 💻 config-file convert -i log4j1 -o log4j2 <inputFile> <outputFile>

Config par défaut modifiée

  • ⏱️ linger.ms: 0 ms → 5 ms (batching ↑, latence +~5 ms)

  • ⏱️ (log.)message.timestamp.after.max.ms: ∞ → 1 h (protège des timestamps “futurs”)

  • 📝 (log.)segment.bytes (min): 14 B → 1 MB (évite micro-segments)

  • num.recovery.threads.per.data.dir: 1 → 2 (rétablissement plus rapide)

zookeeper

zookeeper

  • 🗳️ Consensus distribué, élection de leader

  • 🤝 Coordination du cluster (Broker en vie ? Qui est controller ? Pannes, découvertes …​)

  • 💾 Stockage des métadonnées (topics, partitions, ACL, …​)

Avec ZooKeeper

zooKeeperCluster

KRaft

Avec KRaft

KRaft

Migration

  • 🏥 Avoir un cluster sain (pas de partition offline, réplication stable …​)

  • ⬆️ Monter en 3.9.x

  • 🧪 Migration en environnement de tests

  • (et tester la procédure de rollback!)

  • 📈 Monitorer les logs et JMX

4.0.0

Métriques clients via broker

Broker







Collecte télémétrie
Producer • Consumer • Admin

1. GetTelemetrySubscriptions
Request
2. PushTelemetryRequest
Métriques du client
(3.7+, activé par défaut)
Observabilité
3. Transfert via
Metrics plugin
Format : OpenTelemetry MetricsData v1
Métriques applicatives :
- somme
- jauge
Metrics local = new Metrics(new MetricConfig());
Sensor messageSensor = local.sensor("message-count-sensor");

LinkedHashMap<String, String> tags = new LinkedHashMap<>();
tags.put("component", "my-app");
MetricName metricName = new MetricName(
    "total-messages-processed",
    "application",
    "Total messages processed",
    tags
);

messageSensor.add(metricName, new CumulativeCount());
KafkaMetric metric = local.metrics().get(metricName);

consumer.registerMetricForSubscription(metric);
consumer.unregisterMetricFromSubscription(metric);

auto.offset.reset

TOPIC
0
1
2
3
5
4
6
7
8
9
10



CONSUMER
earliest (default)
latest
by_duration=<duration>

Format ISO-8601

  • PnDTnHnMn.nS

  • Exemples:

    • P30D → 30 jours

    • PT12H → 12 heures

    • P1DT30M → 1 jour et 30 minutes

Rebalance

CONSUMER A
#0 #1
CONSUMER B
#2
CONSUMER C
x2
x2
Revoke #1
x2
x3
x2
x2
x2
x2
x2
#1
Bilan :
- 36 échanges
- 2 stop the world
Heartbeat
JoinGroup
SyncGroup

CONSUMER A
#0 #1
CONSUMER B
#2
CONSUMER C
x2
Revoke #1
x2
x2
x3
x3
#1
Bilan :
- 14 échanges
ConsumerGroupHeartbeat

throughputRebalance

Consumer Group Protocol

  • 🔁 Rebalance incrémental

  • 🎯 Géré par le coordinator, facilite le débug

  • ⚙️ Activé par défaut côté serveur, côté client : group.protocol=consumer

  • ⚖️ Compromis : usage CPU côté serveur, empreinte mémoire plus élevée

  • 💻 Outils d’admin : kafka-groups.sh, kafka-consumer-groups.sh

Streams rebalance protocol

  • 🧪 Early access en 4.1.0

  • 🧩 Se base sur le Consumer Group Protocol

  • 🔀 Même principe mais pour l’assignation des tâches Kafka Streams

Queues

en early access

4.1.0

Métriques des plugins Kafka

Qu’est-ce qu’un plugin Kafka ?

  • Broker: Authorizer, CreateTopicPolicy, ClientQuotaCallback, ReplicaSelector

  • Producer: Serializer, Partitioner, ProducerInterceptor

  • Consumer: Deserializer, ConsumerInterceptor

  • Connect: Converter, Transform, Predicate, Connector, Task

Implémentation

public class MyInterceptor<K, V>
  implements ProducerInterceptor<K, V>, Monitorable {

  private Sensor sensor;

  @Override
  public void withPluginMetrics(PluginMetrics metrics) {
    sensor = metrics.sensor("onSend");
    MetricName rate  = metrics.metricName("rate",  "Average number of calls per second.", new LinkedHashMap<>());
    MetricName total = metrics.metricName("total", "Total number of calls.",             new LinkedHashMap<>());
    sensor.add(rate,  new Rate());
    sensor.add(total, new CumulativeCount());
  }

  @Override
  public ProducerRecord<K,V> onSend(ProducerRecord<K,V> record) {
    sensor.record();
    return record;
  }
}

Migration métriques consumer

  • Problème:

    • Producer: topic.a.b

    • Consumer: topic.a.btopic_a_b

  • Solution:

    • 4.x: Ajoute topic.a.b (nouveau) + conservation topic_a_b (déprécié)

    • 5.0: Seulement topic.a.b 🎉

OAuth2 grant type

  • ❗️ Problème: seul client_credentials supporté

    • Secret statique = Potentiel risque de sécurité

  • 🔐 Solution: jwt-bearer (RFC 7523)

    • Authentification via JWT signé

Kafka Connect: support plugins multi-versions

  • Problème: 1 version par cluster

  • Difficulté à gérer les montées de versions, rollback etc …​

  • 🎛️ Nouveauté: Support de plusieurs versions de plugins sur un cluster

Installation

opt/
  plugins/
    foo-connector-1.8/
      foo-connector-1.8.jar
      foo-dependencies-1.0.jar
    foo-connector-1.9/
      foo-connector-1.9.jar
      foo-dependencies-1.1.jar
    bar-connector/
      bar-connector-1.0.jar

Utilisation

  • Spécifier la version souhaitée

    • "xxx.plugin.version": "[3.5,)"

  • Versions visibles de manière distincte dans l’API

Queues for Kafka

  • Nouvelle manière de consommer un topic

Rappel consommateur classique

consumerClassic

Rappel Point à point / Queue

  • 👤 Message consommé par un seul consommateur

  • 🔄 Traitement asynchrone

  • ✅ Acquittement au message

  • 🗑️ Suppression du message après traitement

Share Groups

  • 🔧 API identique au consumer group

  • ➗ Permet la consommation de messages d'une seule partition par plusieurs consommateurs

  • ✅ Acquittement au message

shareConsumer

Acquittement

  • share.acknowledgement.mode=implicit (default) : acquittement automatique lors du poll()

  • share.acknowledgement.mode=explicit

Explicite

@️KafkaListener(
    topics = "order-processing",
    groupId = "order-processors",
    containerFactory = "shareKafkaListenerContainerFactory"
)
public void processOrder(OrderEvent event,
                         ShareAcknowledgment acknowledgment) {
  try {
    if (!isValid(event)) {
      log.info("Rejet du message, ne sera pas re-deliveré");
      acknowledgment.reject();
      return;
    }

    orderService.process(event);
    acknowledgment.acknowledge();

  } catch (TransientException e) {
    log.info("Erreur temporaire, rejeu possible");
    acknowledgment.release();
  } catch (PermanentException e) {
    log.info("Rejet du message, ne sera pas re-deliveré");
    acknowledgment.reject();
  }
}

Available
Acquired
.poll()
delivery count ++
Acknowledged
.acknowledge()
.reject()
Archived
delivery limit reached
.release()
delivery count < delivery limit

Nouvelles propriétés

  • share.delivery.count.limit: 5

  • share.record.lock.duration.ms: 30000 (30 sec)

Cas d’usage

  • ✉️ Nécessité d’acquittement au message avec rejeu/rejet

  • 📈 Mise à l’échelle sans garantie d’ordre

  • 🔁 Faciliter la migration de système type queue vers Kafka

Preview

  • 🚀 Production ready pour la 4.2.0

  • 💻 Outil d’admin : kafka-share-groups.sh

  • 📨 Intégration de Dead Letter Queue

  • ❓ Garantie d’ordre en réflexion

Support framework

Spring

  • Version: Spring Kafka 4.0.0-RC1

  • 🛠️ Passage total à KRaft, suppression ZooKeeper

  • 🧪 Embedded Kafka pour les tests sur KRaft

  • 🔄 Nouveau protocole rebalance (propriété Spring Boot)

  • 👥 Support Kafka Queues

  • 📊 Observabilité enrichie avec les métriques clients

  • 🎯 Alignement Spring Framework 7: Jackson 3, Spring Retry natif

  • ⚠️ RC, release pour fin d’année

Quarkus

  • Version: Quarkus 3.24+

  • Nouveau protocole rebalance ✅

  • KRaft ✅

  • Métriques clients ❌

  • Kafka Queues ❌

Releases & Compatibilité

  • ~3 Releases/An

  • Release mineure peut contenir des features majeures !

    • 2.8.0 : KRaft intro

    • 3.6.0 : Tiered Storage

  • Support : 3 dernières versions

4.2.0

  • 📅 Sortie pour janvier/février 2026

  • 📦 28 KIP figées

  • 📊 10 KIP sur l’observabilité

  • 📨 Dead letter queue native dans Kafka Streams

Merci 🙏

qr code feedback

Feedback

qr code github

Github

Bonus

Topology

source
processor
stream
processor
sink
processor

Implémentation exemple

public class LoggingProcessorWrapper<KIn, VIn, KOut, VOut>
  implements ProcessorWrapper<KIn, VIn, KOut, VOut> {

...

    private static class LoggingProcessor<KIn, VIn, KOut, VOut>
       implements Processor<KIn, VIn, KOut, VOut>, ProcessorContextAware {

        ...

        @Override
        public void init(ProcessorContext<KOut, VOut> context) {
            delegate.init(context);
        }

        @Override
        public void process(Record<KIn, VIn> record) {
            Instant start = Instant.now();
            logger.info("Entrée dans le processeur");

            delegate.process(record);

            var duration = Duration.between(start, Instant.now());
            logger.info("Sortie du processeur : durée={}", duration.toMillis());
        }

        @Override
        public void close() {
            delegate.close();
        }
    }
}



________________________________
application.properties:

processor.wrapper.class=com.mirakl.LoggingProcessorWrapper

KStreams Foreign Key Extractor

Avant :

// Topic orders, key : {clientId, orderId}, value : {productId, quantity}
// Topic clients, key : {clientId}, value : {name, address}

// Étape intermédiaire : Ajouter orderId à la valeur
KTable<OrderKey, OrderWithClient> ordersWithClient = orders.mapValues(
    (key, order) -> new OrderWithClient(key.getClientId(), order.getProductId(), order.getQuantity()),
);

KTable<OrderKey, EnrichedOrder> enriched = ordersWithClient.join(
    clients,
    // Extracteur de clé étrangère : uniquement la valeur
    orderWithClient -> new ClientKey(orderWithClient.getClientId()),
    (orderWithClient, client) -> EnrichedOrder.of(orderWithClient.getProductId(), orderWithClient.getQuantity(), client.getName(), client.getAddress()),
);

Après :

KTable<OrderKey, EnrichedOrder> enriched = orders.join(
    clients,
    // Extracteur de clé étrangère : utilise clé (OrderKey) et valeur (Order)
    (orderKey, order) -> new ClientKey(orderKey.getClientId()),
    (order, client) -> EnrichedOrder.of(order.getProductId(), order.getQuantity(), client.getName(), client.getAddress()),
);

KRaft architecture

zoomKRaft

Protection contre les deadlocks dans le Producer

  • Problème: flush() dans callback de send() → Deadlock sur ioThread

    • Timeout confusant, app bloquée, diagnostic difficile

  • Solution: KafkaException immédiate

Amélioration gestion d’erreurs transactionnelles dans le Producer

  • Problème: Exceptions floues pour transactions (Retriable? Fatal? Abortable?)

  • Solution: 4 catégories + hiérarchie claire

    • Retriable (ex: TimeoutException) : Retry auto

      • RefreshRetriable (ex: UnknownTopic) : Refresh metadata + retry

    • Abortable : Abort transaction (ex: CommitFailed)

    • ApplicationRecoverable : Restart app/producer (ex: ProducerFenced)

    • InvalidConfig : Fix config (ex: AuthError)

groupes

Outils de gestion des groupes

Nouvel outil kafka-groups.sh

$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list
GROUP                   TYPE          PROTOCOL
old-consumer-group      Classic       consumer
new-consumer-group      Consumer      consumer
connect-cluster         Classic       connect
share-group             Share         share
schema-registry         Classic       sr
simple-consumer-group   Classic

Adaptation des outils existants

  • kafka-consumer-groups.sh

  • kafka-share-groups.sh